home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
PC World Komputer 2010 April
/
PCWorld0410.iso
/
hity wydania
/
Ubuntu 9.10 PL
/
karmelkowy-koliberek-9.10-netbook-remix-PL.iso
/
casper
/
filesystem.squashfs
/
usr
/
share
/
pyshared
/
glchess
/
ai.py
< prev
next >
Wrap
Text File
|
2009-09-22
|
14KB
|
493 lines
# -*- coding: utf-8 -*-
__author__ = 'Robert Ancell <bob27@users.sourceforge.net>'
__license__ = 'GNU General Public License Version 2'
__copyright__ = 'Copyright 2005-2006 Robert Ancell'
import os
import sys
import select
import signal
import xml.dom.minidom
import xml.parsers.expat
import game
import cecp
import uci
from defaults import *
CECP = 'CECP'
UCI = 'UCI'
class Option:
"""
"""
value = ''
class Level:
"""
"""
def __init__(self):
self.options = []
class Profile:
"""
"""
def __init__(self):
self.name = ''
self.protocol = ''
self.path = ''
self.executables = []
self.arguments = []
self.profiles = {}
def detect(self):
"""
"""
try:
path = os.environ['PATH'].split(os.pathsep)
except KeyError:
path = []
for directory in path:
for executable in self.executables:
b = directory + os.sep + executable
if os.path.isfile(b):
self.path = b
return
self.path = None
def _getXMLText(node):
"""
"""
if len(node.childNodes) == 0:
return ''
if len(node.childNodes) > 1 or node.childNodes[0].nodeType != node.TEXT_NODE:
raise ValueError
return node.childNodes[0].nodeValue
def _loadLevel(node):
"""
"""
level = Level()
n = node.getElementsByTagName('name')
if len(n) != 1:
return None
level.name = _getXMLText(n[0])
for e in node.getElementsByTagName('option'):
option = Option()
option.value = _getXMLText(e)
try:
attribute = e.attributes['name']
except KeyError:
pass
else:
option.name = _getXMLText(attribute)
level.options.append(option)
return level
def loadProfiles():
"""
"""
profiles = []
fileNames = [os.path.expanduser(LOCAL_AI_CONFIG), os.path.join(BASE_DIR, 'ai.xml'), 'ai.xml']
document = None
for f in fileNames:
try:
document = xml.dom.minidom.parse(f)
except IOError:
pass
except xml.parsers.expat.ExpatError:
print 'AI configuration from %s is invalid, ignoring' % f
else:
#print 'Loading AI configuration from %s' % f
break
if document is None:
print 'WARNING: No AI configuration'
return profiles
elements = document.getElementsByTagName('aiconfig')
if len(elements) == 0:
return profiles
for e in elements:
for p in e.getElementsByTagName('ai'):
try:
protocolName = p.attributes['type'].nodeValue
except KeyError:
assert(False)
if protocolName == 'cecp':
protocol = CECP
elif protocolName == 'uci':
protocol = UCI
else:
assert(False), 'Uknown AI type: %s' % repr(protocolName)
n = p.getElementsByTagName('name')
assert(len(n) > 0)
name = _getXMLText(n[0])
executables = []
n = p.getElementsByTagName('binary')
assert(len(n) > 0)
for x in n:
executables.append(_getXMLText(x))
arguments = []
for x in p.getElementsByTagName('argument'):
arguments.append(_getXMLText(x))
levels = {}
for x in p.getElementsByTagName('level'):
level = _loadLevel(x)
if level is not None:
levels[level.name] = level
profile = Profile()
profile.name = name
profile.protocol = protocol
profile.executables = executables
profile.arguments = arguments
profile.levels = levels
profiles.append(profile)
return profiles
class CECPConnection(cecp.Connection):
"""
"""
def __init__(self, player):
"""
"""
self.player = player
cecp.Connection.__init__(self)
def onOutgoingData(self, data):
"""Called by cecp.Connection"""
self.player.logText(data, 'output')
self.player.sendToEngine(data)
def onMove(self, move):
"""Called by cecp.Connection"""
if self.player.isReadyToMove():
self.player.moving = True
self.player.move(move)
else:
assert(self.player.suppliedMove is None)
self.player.suppliedMove = move
def onResign(self):
"""Called by cecp.Connection"""
self.player.resign()
def onDraw(self):
"""Called by cecp.Connection"""
print self.player.claimDraw()
def logText(self, text, style):
"""Called by cecp.Connection"""
self.player.logText(text, style)
class UCIConnection(uci.StateMachine):
"""
"""
def __init__(self, player):
"""
"""
self.player = player
uci.StateMachine.__init__(self)
def onOutgoingData(self, data):
"""Called by uci.StateMachine"""
self.player.logText(data, 'output')
self.player.sendToEngine(data)
def logText(self, text, style):
"""Called by uci.StateMachine"""
self.player.logText(text, style)
def onMove(self, move):
"""Called by uci.StateMachine"""
self.player.move(move)
# Catch zombie processes
def _cDied(sig, stackFrame):
try:
(pid, status) = os.waitpid(-1, os.WNOHANG)
except OSError:
pass
signal.signal(signal.SIGCHLD, _cDied)
class Player(game.ChessPlayer):
"""
"""
def __init__(self, name, profile, level = 'normal'):
"""Constructor for an AI player.
'name' is the name of the player (string).
'profile' is the profile to use for the AI (Profile).
'level' is the difficulty level to use (string).
"""
self.__profile = profile
self.__level = level
self.moving = False
self.suppliedMove = None
game.ChessPlayer.__init__(self, name)
# Pipe to communicate to engine with
(toManagerOutput, toManagerInput) = os.pipe()
(fromManagerOutput, fromManagerInput) = os.pipe()
# Store the file descripter for reading/writing
self.__toEngineFd = toManagerInput
self.__fromEngineFd = fromManagerOutput
# Fork off a child process to manage the engine
try:
self.__pid = os.fork()
except OSError, e:
print 'Monitor failed to fork: %s' % e.message
os.close(toManagerInput)
os.close(toManagerOutput)
os.close(fromManagerInput)
os.close(fromManagerOutput)
self.__toEngineFd = None
self.__fromEngineFd = None
else:
if self.__pid == 0:
os.close(toManagerInput)
os.close(fromManagerOutput)
self._runMonitor(fromManagerInput, toManagerOutput)
os.close(toManagerOutput)
os.close(fromManagerInput)
os._exit(0)
os.close(toManagerOutput)
os.close(fromManagerInput)
if profile.protocol == CECP:
self.connection = CECPConnection(self)
elif profile.protocol == UCI:
self.connection = UCIConnection(self)
else:
assert(False)
self.connection.start()
self.connection.startGame()
try:
level = self.__profile.levels[self.__level]
except KeyError:
self.connection.configure()
else:
self.connection.configure(level.options)
# Methods to extend
def logText(self, text, style):
"""
"""
pass
# Public methods
def getProfile(self):
"""Get the AI profile this AI is using.
Returns a 2-tuple containing the profile name (str) and the difficulty level (str).
"""
return (self.__profile.name, self.__level)
def fileno(self):
"""Returns the file descriptor for communicating with the engine (integer)"""
return self.__fromEngineFd
def read(self):
"""Read and process data from the engine.
This is non-blocking.
"""
while True:
# Connection was closed
if self.__fromEngineFd == None:
return False
# Check if data is available
(rlist, _, xlist) = select.select([self.__fromEngineFd], [], [self.__fromEngineFd], 0)
if (len(rlist) + len(xlist)) == 0:
return True
# Read a chunk and process
try:
data = os.read(self.__fromEngineFd, 256)
except OSError, e:
print 'Error reading from chess engine: ' + str(e)
self._die()
return False
if len(data) == 0:
print 'Engine has died'
self._die()
return False
self.connection.registerIncomingData(data)
def sendToEngine(self, data):
"""
"""
if self.__toEngineFd == None:
return
try:
os.write(self.__toEngineFd, data)
except OSError, e:
print 'Failed to write to engine: ' + str(e)
def quit(self):
"""Disconnect the AI"""
fd = self.__toEngineFd
self.__toEngineFd = None
self.__fromEngineFd = None
# Send quit
try:
if fd is not None:
os.write(fd, '\nquit\n') # FIXME: CECP specific
except OSError:
return
# Extended methods
def onPlayerMoved(self, player, move):
"""Called by game.ChessPlayer"""
if self.__toEngineFd == None:
return
isSelf = player is self and self.moving
self.moving = False
self.connection.reportMove(move.canMove, isSelf)
def onUndoMove(self):
"""Called by game.ChessPlayer"""
self.connection.undoMove()
def readyToMove(self):
"""Called by game.ChessPlayer"""
if self.__toEngineFd == None:
self.die()
return
# AI Engines always claim draw due to three-fold-repetition and
# 50 move rule
if self.claimDraw():
return
game = self.getGame()
whiteTime = game.getWhite().getRemainingTime()
blackTime = game.getBlack().getRemainingTime()
if game.getWhite() is self:
ownTime = whiteTime
else:
ownTime = blackTime
if self.suppliedMove is None:
self.connection.requestMove(whiteTime, blackTime, ownTime)
else:
self.moving = True
move = self.suppliedMove
self.suppliedMove = None
self.move(move)
# AI Engines always claim draw due to three-fold-repetition and
# 50 move rule
self.claimDraw()
def onGameEnded(self, game):
"""Called by game.ChessPlayer"""
self.quit()
def _die(self):
self.quit()
self.die()
def _runEngine(self, toEngineFd, fromEngineFd):
# Make the engine low priority for CPU usage
os.nice(19)
# Change directory so any log files are not in the users home directory
try:
os.mkdir(LOG_DIR)
except OSError:
pass
try:
os.chdir(LOG_DIR)
except OSError:
pass
# Connect stdin, stdout and stderr to the manager process
os.dup2(toEngineFd, sys.stdin.fileno())
os.dup2(fromEngineFd, sys.stdout.fileno())
os.dup2(fromEngineFd, sys.stderr.fileno())
# Execute the engine
try:
os.execv(self.__profile.path, [self.__profile.path] + self.__profile.arguments)
except OSError:
pass
def _runMonitor(self, toApplicationFd, fromApplicationFd):
# Make pipes to the child process
(toEngineOutput, toEngineInput) = os.pipe()
(fromEngineOutput, fromEngineInput) = os.pipe()
# Fork and execute the child
try:
enginePID = os.fork()
except OSError, e:
print 'Monitor failed to fork: %s' % e.message
os._exit(1);
if enginePID == 0:
os.close(toApplicationFd)
os.close(fromApplicationFd)
os.close(toEngineInput)
os.close(fromEngineOutput)
self._runEngine(toEngineOutput, fromEngineInput)
os._exit(0)
else:
os.close(toEngineOutput)
os.close(fromEngineInput)
# Forward data between the application and the child process and wait for closed pipes
inputPipes = (fromApplicationFd, fromEngineOutput)
targets = {fromApplicationFd: toEngineInput,
fromEngineOutput: toApplicationFd}
pipes = (toApplicationFd, fromApplicationFd,
toEngineInput, fromEngineOutput)
try:
while True:
# Wait for data
(rfds, _, xfds) = select.select(inputPipes, [], pipes, None)
for fd in rfds:
data = os.read(fd, 65535)
if len(data) == 0:
raise OSError('End of data')
# Bridge information between the application and the engine
os.write(targets[fd], data)
except:
# Kill the child and exit
try:
os.kill(enginePID, signal.SIGQUIT)
except OSError:
pass
os._exit(0)